d04d3ceaf9a678cd962690b8d45df5b96b150106,bridge/src/main/java/io/ppatierno/kafka/bridge/SinkBridgeEndpoint.java,KafkaConsumerRunner,run,#,211
Before Change
DeliveryOptions options = new DeliveryOptions();
options.addHeader(SinkBridgeEndpoint.EVENT_BUS_HEADER_COMMAND, SinkBridgeEndpoint.EVENT_BUS_SEND_COMMAND);
this.vertx.eventBus().send(this.ebQueue, "", options);
}
}
} catch (WakeupException e) {
After Change
DeliveryOptions options = new DeliveryOptions();
options.addHeader(SinkBridgeEndpoint.EVENT_BUS_HEADER_COMMAND, SinkBridgeEndpoint.EVENT_BUS_SEND_COMMAND);
if (this.qos == ProtonQoS.AT_MOST_ONCE) {
// Sender QoS settled (AT_MOST_ONCE) : commit immediately and start message sending
try {
// 1. immediate commit
this.consumer.commitSync();
// 2. commit ok, so we can enqueue record for sending
for (ConsumerRecord<String, byte[]> record : records) {
LOG.info("Received from Kafka partition {} [{}], key = {}, value = {}", record.partition(), record.offset(), record.key(), new String(record.value()));
this.queue.add(record);
}
// 3. start message sending
this.vertx.eventBus().send(this.ebQueue, "", options);
} catch (Exception e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
} else {
// Sender QoS unsettled (AT_LEAST_ONCE) : start message sending, wait end and commit
// 1. enqueue record for sending
for (ConsumerRecord<String, byte[]> record : records) {
LOG.info("Received from Kafka partition {} [{}], key = {}, value = {}", record.partition(), record.offset(), record.key(), new String(record.value()));
this.queue.add(record);
}
// 2. start message sending
this.vertx.eventBus().send(this.ebQueue, "", options, ar -> {
// 4. commit
this.consumer.commitSync();
});
// TODO : consider timeout ??
try {